Face-mask detection on KRIA¶

Decription: Using a Convolutional Neural Network (CNN) detects if a person is wearing a face-mask or not. The following notebook can either run on an image or a feed from a webcam

Model: yolo-fastest

Input: 512 x 512 resolution images

Output: Two tensors, one with bounding box coordinates, other with the confidence score of prediction

1. Prepare the overlay¶

Program the FPGA, on the KRIA board, with DPU (Deep Learning Processing Unit) Overlay file called dpu.bit

In [ ]:
from pynq_dpu import DpuOverlay
overlay = DpuOverlay("dpu.bit")

2. Import in-built libraries¶

In [ ]:
import os
import time
import numpy as np
import cv2
import random
import colorsys
from matplotlib.patches import Rectangle
import matplotlib.pyplot as plt
%matplotlib inline

3. Load the pre-trained model on the overlay programmed in step-1¶

In [ ]:
overlay.load_model("pt_face-mask-detection.xmodel")

4. Some utility functions¶

In [ ]:
anchor_list = [10,13,16,30,33,23,30,61,62,45,59,119,116,90,156,198,373,326]
anchor_float = [float(x) for x in anchor_list]
anchors = np.array(anchor_float).reshape(-1, 2)
In [ ]:
'''Get model classification information'''	
def get_class(classes_path):
    with open(classes_path) as f:
        class_names = f.readlines()
    class_names = [c.strip() for c in class_names]
    return class_names
    
classes_path = "data/face_mask_names.txt" # give the path to  file containing classes: Mask and No Mask
class_names = get_class(classes_path)

num_classes = len(class_names)
hsv_tuples = [(1.0 * x / num_classes, 1., 1.) for x in range(num_classes)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
#creating a colors list to store colors
colors = list(map(lambda x: 
                  (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), 
                  colors))
random.seed(0)
random.shuffle(colors)
random.seed(None)
In [ ]:
'''resize image with unchanged aspect ratio using padding'''
def letterbox_image(image, size):
    ih, iw, _ = image.shape
    w, h = size
    scale = min(w/iw, h/ih)
    #print(scale)
    
    nw = int(iw*scale)
    nh = int(ih*scale)
    #print(nw)
    #print(nh)

    image = cv2.resize(image, (nw,nh), interpolation=cv2.INTER_LINEAR)
    new_image = np.ones((h,w,3), np.uint8) * 128
    h_start = (h-nh)//2
    w_start = (w-nw)//2
    new_image[h_start:h_start+nh, w_start:w_start+nw, :] = image
    return new_image


'''image preprocessing'''
def pre_process(image, model_image_size):
    image = image[...,::-1]
    image_h, image_w, _ = image.shape
 
    if model_image_size != (None, None):
        assert model_image_size[0]%32 == 0, 'Multiples of 32 required'
        assert model_image_size[1]%32 == 0, 'Multiples of 32 required'
        boxed_image = letterbox_image(image, tuple(reversed(model_image_size)))
    else:
        new_image_size = (image_w - (image_w % 32), image_h - (image_h % 32))
        boxed_image = letterbox_image(image, new_image_size)
    image_data = np.array(boxed_image, dtype='float32')
    image_data /= 255.
    image_data = np.expand_dims(image_data, 0) 	
    return image_data
In [ ]:
'''boxes_and_score is calling correct_boxes and _get_feats function'''

# function to get features
def _get_feats(feats, anchors, num_classes, input_shape):
    num_anchors = len(anchors)
    anchors_tensor = np.reshape(np.array(anchors, dtype=np.float32), [1, 1, 1, num_anchors, 2])
    grid_size = np.shape(feats)[1:3]
    nu = num_classes + 5
    predictions = np.reshape(feats, [-1, grid_size[0], grid_size[1], num_anchors, nu])
    grid_y = np.tile(np.reshape(np.arange(grid_size[0]), [-1, 1, 1, 1]), [1, grid_size[1], 1, 1])
    grid_x = np.tile(np.reshape(np.arange(grid_size[1]), [1, -1, 1, 1]), [grid_size[0], 1, 1, 1])
    grid = np.concatenate([grid_x, grid_y], axis = -1)
    grid = np.array(grid, dtype=np.float32)

    box_xy = (1/(1+np.exp(-predictions[..., :2])) + grid) / np.array(grid_size[::-1], dtype=np.float32)
    box_wh = np.exp(predictions[..., 2:4]) * anchors_tensor / np.array(input_shape[::-1], dtype=np.float32)
    box_confidence = 1/(1+np.exp(-predictions[..., 4:5]))
    box_class_probs = 1/(1+np.exp(-predictions[..., 5:]))
    return box_xy, box_wh, box_confidence, box_class_probs


def correct_boxes(box_xy, box_wh, input_shape, image_shape):
    box_yx = box_xy[..., ::-1]
    box_hw = box_wh[..., ::-1]
    input_shape = np.array(input_shape, dtype = np.float32)
    image_shape = np.array(image_shape, dtype = np.float32)
    new_shape = np.around(image_shape * np.min(input_shape / image_shape))
    offset = (input_shape - new_shape) / 2. / input_shape
    scale = input_shape / new_shape
    box_yx = (box_yx - offset) * scale
    box_hw *= scale

    box_mins = box_yx - (box_hw / 2.)
    box_maxes = box_yx + (box_hw / 2.)
    boxes = np.concatenate([
        box_mins[..., 0:1],
        box_mins[..., 1:2],
        box_maxes[..., 0:1],
        box_maxes[..., 1:2]
    ], axis = -1)
    boxes *= np.concatenate([image_shape, image_shape], axis = -1)
    return boxes


def boxes_and_scores(feats, anchors, classes_num, input_shape, image_shape):
    box_xy, box_wh, box_confidence, box_class_probs = _get_feats(feats, anchors, classes_num, input_shape)
    boxes = correct_boxes(box_xy, box_wh, input_shape, image_shape)
    boxes = np.reshape(boxes, [-1, 4])
    box_scores = box_confidence * box_class_probs
    box_scores = np.reshape(box_scores, [-1, classes_num])
    return boxes, box_scores
In [ ]:
def nms_boxes(boxes, scores):
    """Suppress non-maximal boxes.

    # Arguments
        boxes: ndarray, boxes of objects.
        scores: ndarray, scores of objects.

    # Returns
        keep: ndarray, index of effective boxes.
    """
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    areas = (x2-x1+1)*(y2-y1+1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)

        xx1 = np.maximum(x1[i], x1[order[1:]])
        yy1 = np.maximum(y1[i], y1[order[1:]])
        xx2 = np.minimum(x2[i], x2[order[1:]])
        yy2 = np.minimum(y2[i], y2[order[1:]])

        w1 = np.maximum(0.0, xx2 - xx1 + 1)
        h1 = np.maximum(0.0, yy2 - yy1 + 1)
        inter = w1 * h1

        ovr = inter / (areas[i] + areas[order[1:]] - inter)
        inds = np.where(ovr <= 0.55)[0]  # threshold
        order = order[inds + 1]

    return keep

'''The following function returns the coordinates of the boxes, scores, and the class (mask or no mask). 
To do so it calls boxes_and_scores and nms_boxes function'''
def evaluate(yolo_outputs, image_shape, class_names, anchors):
    score_thresh = 0.2
    anchor_mask = [[6, 7, 8], [3, 4, 5], [0, 1, 2]]
    boxes = []
    box_scores = []
    input_shape = np.shape(yolo_outputs[0])[1 : 3]
    input_shape = np.array(input_shape)*32

    for i in range(len(yolo_outputs)):
        _boxes, _box_scores = boxes_and_scores(
            yolo_outputs[i], anchors[anchor_mask[i]], len(class_names), 
            input_shape, image_shape)
        boxes.append(_boxes)
        box_scores.append(_box_scores)
    boxes = np.concatenate(boxes, axis = 0)
    box_scores = np.concatenate(box_scores, axis = 0)

    mask = box_scores >= score_thresh
    boxes_ = []
    scores_ = []
    classes_ = []
    for c in range(len(class_names)):
        class_boxes_np = boxes[mask[:, c]]
        class_box_scores_np = box_scores[:, c]
        class_box_scores_np = class_box_scores_np[mask[:, c]]
        nms_index_np = nms_boxes(class_boxes_np, class_box_scores_np) 
        class_boxes_np = class_boxes_np[nms_index_np]
        class_box_scores_np = class_box_scores_np[nms_index_np]
        classes_np = np.ones_like(class_box_scores_np, dtype = np.int32) * c
        boxes_.append(class_boxes_np)
        scores_.append(class_box_scores_np)
        classes_.append(classes_np)
    boxes_ = np.concatenate(boxes_, axis = 0)
    scores_ = np.concatenate(scores_, axis = 0)
    classes_ = np.concatenate(classes_, axis = 0)

    return boxes_, scores_, classes_
In [ ]:
'''Draw detection frame'''
def draw_boxes(image, boxes, scores, classes):
    _, ax = plt.subplots(1)
    ax.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    image_h, image_w, _ = image.shape

    for i, bbox in enumerate(boxes):
        [top, left, bottom, right] = bbox
        width, height = right - left, bottom - top
        center_x, center_y = left + width*0.5, top + height*0.5
        x = int(center_x - width)
        y = int(center_y - height)
        w = int(width)
        h = int(height)
        score, class_index = scores[i], classes[i]
        label = '{}: {:.4f}'.format(class_names[class_index], score) 
        color = tuple([color/255 for color in colors[class_index]])
        ax.add_patch(Rectangle((left, top), width, height,
                               edgecolor=color, facecolor='none'))
        ax.annotate(label, (center_x, center_y), color=color, weight='bold', 
                    fontsize=12, ha='center', va='center')
    return ax

Load the images for running mask detection¶

These images will be used to test if the the face-mask detection model is able to detect the mask or not.

In [ ]:
image_folder = 'img/' #path to the folder containing images
original_images = [i for i in os.listdir(image_folder) if i.endswith("jpg")] #put the images in original images list, if jpg
total_images = len(original_images) #getting the length of original_images list

Start the overlay¶

Follow these three steps to run the overlay, get the input tensors, and output tensors.

In [ ]:
dpu = overlay.runner

inputTensors = dpu.get_input_tensors()
outputTensors = dpu.get_output_tensors()
In [ ]:
shapeIn = tuple(inputTensors[0].dims) #get the shape of the input tensor
print(shapeIn)
In [ ]:
#get the shape of the output tensor
#for this model we have two output tensors: 
#one containing the bounding box coordinates and second containing the confidence score
shapeOut0 = (tuple(outputTensors[0].dims)) 
shapeOut1 = (tuple(outputTensors[1].dims)) 
print(shapeOut0)
print(shapeOut1)
In [ ]:
outputSize0 = int(outputTensors[0].get_data_size() / shapeIn[0]) # 12675
outputSize1 = int(outputTensors[1].get_data_size() / shapeIn[0]) # 50700
print(outputSize0)
print(outputSize1)
In [ ]:
input_data = [np.empty(shapeIn, dtype=np.float32, order="C")]
output_data = [np.empty(shapeOut0, dtype=np.float32, order="C"), 
               np.empty(shapeOut1, dtype=np.float32, order="C")]
image = input_data[0]

Function to run mask detection on KRIA¶

This is the top function that will call other utility functions to run face mask detection on KRIA

In [ ]:
'''Run takes the image, and calls three functions pre_process, evaluate, and draw_boxes function'''
def run(image_index, display=False):
    # Read input image
    input_image = cv2.imread(os.path.join(image_folder, original_images[image_index]))
    print(input_image.shape)
    # Pre-processing
    image_size = input_image.shape[:2]
    #Note here we are giving the input image size 512 x 512
    image_data = np.array(pre_process(input_image, (512, 512)), dtype=np.float32) 
    
    # Fetch data to DPU and trigger it
    image[0,...] = image_data.reshape(shapeIn[1:])
    job_id = dpu.execute_async(input_data, output_data)
    dpu.wait(job_id)
    
    # Retrieve output data
    conv_out0 = np.reshape(output_data[0], shapeOut0)
    conv_out1 = np.reshape(output_data[1], shapeOut1)
    yolo_outputs = [conv_out0, conv_out1]
    
    # Decode output from YOLOv3
    boxes, scores, classes = evaluate(yolo_outputs, image_size, class_names, anchors)
    
    if display:
        _ = draw_boxes(input_image, boxes, scores, classes)
#     print("Number of detected objects: {}".format(len(boxes)))

Make a function call to run¶

In [ ]:
run(8, display=True) #change the image index from 1 to 8 to detect face mask on different images

Setting up the webcam¶

Use the following three steps to set up the webcam:

  1. Capture Video
  2. Set frame height
  3. Set frame width
In [ ]:
videoIn = cv2.VideoCapture(0)
videoIn.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # Disable buffering
videoIn.set(cv2.CAP_PROP_FRAME_WIDTH, 640);
videoIn.set(cv2.CAP_PROP_FRAME_HEIGHT, 480);

print("Capture device is open: " + str(videoIn.isOpened()))

Read one frame (image) from the webcam¶

In [ ]:
ret, frame = videoIn.read()

Function to run face mask detection on live webcam feed¶

Similar to the run function we used above to detect face mask, we write another top function called run_webcam to run face mask detection on webcam feed.

The only difference between run and run_webcam is the function called to draw bounding boxes on the face.

In [ ]:
def run_webcam(webcam_image, display=False):
    # Read input image
    input_image = webcam_image
    # Pre-processing
    image_size = input_image.shape[:2]
    image_data = np.array(pre_process(input_image, (512, 512)), dtype=np.float32)
    
    # Fetch data to DPU and trigger it
    image[0,...] = image_data.reshape(shapeIn[1:])
    job_id = dpu.execute_async(input_data, output_data)
    dpu.wait(job_id)
    
    # Retrieve output data
    conv_out0 = np.reshape(output_data[0], shapeOut0)
    conv_out1 = np.reshape(output_data[1], shapeOut1)
    yolo_outputs = [conv_out0, conv_out1]
    
    # Decode output from YOLOv3
    boxes, scores, classes = evaluate(yolo_outputs, image_size, class_names, anchors)
    
    if display:
        input_image = draw_boxes_on_frame(input_image, boxes, scores, classes) #different function called to draw bouding boxes
    return  input_image
In [ ]:
def draw_boxes_on_frame(image, boxes, scores, classes):
    image_h, image_w, _ = image.shape
    for i, bbox in enumerate(boxes):
        [top, left, bottom, right] = bbox
        width, height = right - left, bottom - top
        center_x, center_y = left + width*0.5, top + height*0.5
        x = int(center_x - width)
        y = int(center_y - height)
        w = int(width)
        h = int(height)
        cv2.rectangle(image, (int(left), int(top)), (int(right), int(bottom)), (0, 255, 0), 2)
        score, class_index = scores[i], classes[i]
        label = '{}: {:.4f}'.format(class_names[class_index], score)

        font = cv2.FONT_HERSHEY_SIMPLEX
        color = (255, 0, 0)
        fontScale=1
        thickness=2
        image = cv2.putText(image, label, (int(center_x), int(center_y)), font, fontScale, color, thickness, cv2.LINE_AA)
    return image

Code to display live webcam feed in the notebook¶

The following piece of code displays the webcam feed in the jupyter notebook and continuously reads and sends the frames to run_webcam function for detecting mask.

To stop the feed from webcam, click on the black square bottom in the toolbar. If you hover on the black square button, it will show 'Interrupt the kernel'.

In [ ]:
from IPython.display import display, Image
display_handle=display(None, display_id=True)
try:
    while True:
        _, frame = videoIn.read()
        frame = run_webcam(frame, display=True)
#         frame = cv2.flip(frame, 1) # if your camera reverses your image
        _, frame = cv2.imencode('.jpeg', frame)
        display_handle.update(Image(data=frame.tobytes()))
except KeyboardInterrupt:
    pass
finally:
    videoIn.release()
    display_handle.update(None)

Always run the below line of code to release the webcam device¶

In [ ]:
videoIn.release() 

To get the performance of KRIA board¶

In [ ]:
time1 = time.time()
[run(i) for i in range(total_images)]
time2 = time.time()
fps = total_images/(time2-time1)
print("Performance: {} FPS".format(fps))

Always delete the overlay and dpu object deployed on KRIA¶

In [ ]:
del overlay
del dpu

Challenge¶

  1. Print total objects detected in the image/frame.
  2. Ring a siren when a person is detected without a face mask
In [ ]: